package day02.transform;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * Flink 流处理 API - transform
 *
 * @author lvbingbing
 * @date 2021-11-19 11:29
 */
public class FlinkTransform01 {
    public static void main(String[] args) throws Exception {
        // 1、创建FlinkTransform00对象，有参构造会初始化 env，并从文件中读取数据
        int parallelism = 1;
        FlinkTransform00 flinkTransform = new FlinkTransform00(parallelism);
        // 2、获取执行环境
        StreamExecutionEnvironment env = flinkTransform.getEnv();
        // 3、学习Map、flatMap、filter
        studyMapFilter(flinkTransform.getInputStream());
        // 4、触发程序执行
        env.execute();
    }

    /**
     * 学习Map、flatMap、filter
     *
     * @param dataStream <br>
     */
    private static void studyMapFilter(DataStream<String> dataStream) {
        // 1、map，把 String 转换成长度输出
        mapTransform(dataStream);
        // 2、flatMap，按逗号分割字段
        flatMapTransform(dataStream);
        // 3、filter，筛选sensor_1开头的id对应的数据
        filterTransform(dataStream);
    }

    /**
     * 转换算子-map
     *
     * @param dataStream <br>
     */
    private static void mapTransform(DataStream<String> dataStream) {
        //写法一
        DataStream<Integer> mapStream = dataStream.map(new MapFunction<String, Integer>() {
            @Override
            public Integer map(String value) throws Exception {
                return value.length();
            }
        });
        mapStream.print("mapStream1");
        // 写法二
        DataStream<Integer> mapStream2 = dataStream.map((MapFunction<String, Integer>) String::length);
        mapStream2.print("mapStream2");
        // 写法三
        DataStream<Integer> mapStream3 = dataStream.map(String::length);
        mapStream3.print("mapStream3");
    }

    /**
     * 转换算子-flatMap
     *
     * @param dataStream <br>
     */
    private static void flatMapTransform(DataStream<String> dataStream) {
        // 写法一
        DataStream<String> flatMapStream = dataStream.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                String[] split = value.split(",");
                for (String s : split) {
                    out.collect(s);
                }
            }
        });
        flatMapStream.print("flatMapStream");
        // 写法二
        DataStream<String> flatMapStream2 = dataStream.flatMap((FlatMapFunction<String, String>) (value, collector) -> {
            String[] split = value.split(",");
            for (String s : split) {
                collector.collect(s);
            }
        }).returns(Types.STRING);
        flatMapStream2.print("flatMapStream2");
        // 写法三
        DataStream<String> flatMapStream3 = dataStream.flatMap(((String value, Collector<String> out) -> {
            String[] split = value.split(",");
            for (String s : split) {
                out.collect(s);
            }
        })).returns(Types.STRING);
        flatMapStream3.print("flatMapStream3");
    }

    /**
     * 转换算子-filter
     *
     * @param dataStream <br>
     */
    private static void filterTransform(DataStream<String> dataStream) {
        // 写法一
        DataStream<String> filterStream = dataStream.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String value) throws Exception {
                return value.startsWith("sensor_1");
            }
        });
        filterStream.print("filterStream");
        // 写法二
        DataStream<String> filterStream2 = dataStream.filter((FilterFunction<String>) value -> value.startsWith("sensor_1"));
        filterStream2.print("filterStream2");
        // 写法三
        DataStream<String> filterStream3 = dataStream.filter(value -> value.startsWith("sensor_1"));
        filterStream3.print("filterStream3");
    }
}